PyBroMo 5. Two-state dynamics - Dynamic smFRET simulation

This notebook is part of PyBroMo a python-based single-molecule Brownian motion diffusion simulator that simulates confocal smFRET experiments.

Overview

In this notebook we simulate a freely-diffusing smFRET experiment with dynamics between two states. The input are two smFRET files, one for each static state. These input files need to be simulations from the same particles trajectories but with different E*.

Load static FRET data


In [ ]:
from pathlib import Path
from textwrap import dedent, indent
import numpy as np
import tables
from scipy.stats import expon
import phconvert as phc
print('phconvert version:', phc.__version__)

In [ ]:
SIM_PATH = 'data/'

In [ ]:
filelist = list(Path(SIM_PATH).glob('smFRET_*_600s.hdf5'))
[f.name for f in filelist]

In [ ]:
filename_a = str([f for f in filelist if '11_E_40_Em' in f.name][0])
filename_b = str([f for f in filelist if '11_E_75_Em' in f.name][0])
filename_a, filename_b

In [ ]:
da = tables.open_file(filename_a)
db = tables.open_file(filename_b)

In [ ]:
da.filename

In [ ]:
db.filename

In [ ]:
print(da.root.description.read().decode())

In [ ]:
print(db.root.description.read().decode())

In [ ]:
# Make sure files a re using the same trajectories
assert da.root.description.read().decode().split('\n')[5] == db.root.description.read().decode().split('\n')[5]

Timestamps, detectors and particles for the two states


In [ ]:
# Timestamps
times_a = da.root.photon_data.timestamps.read()
times_b = db.root.photon_data.timestamps.read()

# Detectors
det_a = da.root.photon_data.detectors.read()
det_b = db.root.photon_data.detectors.read()

# Particle number for each timestamp
par_a = da.root.photon_data.particles.read()
par_b = db.root.photon_data.particles.read()

In [ ]:
par_a

In [ ]:
acquisition_duration = da.root.acquisition_duration.read()
assert acquisition_duration == db.root.acquisition_duration.read()
print('Acquisition duration: %d s' % acquisition_duration)

In [ ]:
times_unit = da.root.photon_data.timestamps_specs.timestamps_unit.read()
times_unit

Simulation

Mean residence times for the two states a and b


In [ ]:
tau_a = 1e-3 / 5
tau_b = 0.5e-3 / 5

tau_s = [tau_a, tau_b]

Exponential distributions of residence times for the two states


In [ ]:
expon_s = tuple(expon(scale=tau / times_unit) for tau in tau_s)

In [ ]:
n = int(1.1 * acquisition_duration / (tau_a + tau_b))  # Number of transitions (upper limit)

Define functions


In [ ]:
def sim_two_states_single_particle(times_s, taus_s):
    """Simulate 2-state transitions for a single particle.
    
    Arguments:
        times_s (tuple or arrays): 2-tuple of timestamps arrays
            for the two states a (times_s[0]) and b (times_s[1]).
        taus_s (tuple or arrays): 2-tuple of residence times arrays
            for the two states a (taus_s[0]) and b (taus_s[1]).
    
    Returns:
        List of index pairs. Each pair is a start/stop index for 
        for the timestamps of current state for a specific residence time. 
        The states are strictly alternating starting from 0 (i.e. a).
        
        - first pair: (state = 0) start/stop index for array `times_s[0]` (where 0 = state) 
          corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[0][0]`
        - second pair: (state = 1) start/stop index for array `times_s[1]` (where 1 = state) 
          corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[1][0]`
        - third pair: (state = 0) start/stop index for array `times_s[0]` (where 0 = state) 
          corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[0][1]`
          
        and so on.
    """
    slices_list = []

    index_s = [0, 0]          # indexes for looping thorugh the timestamps arrays
    index_start_s = [0, 0]    # indexes of current state start in each timestamps array
    index_tau_s = [0, 0]      # index of current time window duration
    t_start = 0               # time of current state start

    state, otherstate = 0, 1
    while ((index_s[0] < len(times_s[0]) - 1) and
           (index_s[1] < len(times_s[1]) - 1)):
        # Duration of current time window (i.e. duration of current state)
        tau = taus_s[state][index_tau_s[state]]

        # Find timestamps in current time window 
        # for both timestamps arrays
        for state_i in (0, 1):
            times = times_s[state_i]
            delta_t = times[index_s[state_i]] -  t_start
            while delta_t < tau and index_s[state_i] < len(times) - 1:
                index_s[state_i] += 1
                delta_t = times[index_s[state_i]] - t_start
                #print(state, index_s[state])
                
        # Save the timestamps only for current state
        slices_list.append((index_start_s[state], index_s[state]))

        # Save index of first timestamp in next time window
        index_start_s = index_s.copy()
        
        # Discard current "used" tau
        index_tau_s[state] += 1

        # Switch to a new state
        t_start += tau
        state, otherstate = otherstate, state
    return slices_list

In [ ]:
def sim_two_states(num_particles, times_states, det_states, par_states, times_unit, expon_s, seed=1):
    """Simulate 2-state transitions for a set of particles.
    
    Arguments:
        num_particles (int): number of simulated particles.
        times_states (tuple of arrays): 2-tuple of timestamps arrays, one for each state
        det_states (tuple of arrays): 2-tuple of detectors arrays, one for each state
        par_states (tuple of arrays): 2-tuple of particles arrays, one for each state
        times_unit (float): timestamps unit in seconds.
        expon_s (tuple of scipy.stats distributions): 2-tuple of exponential distributions
            used to simulate residency times for each state. Each element is a frozen
            `scipy.stats.expon` distribution with scale parameter set according to the 
            residency time for the corresponding state.
    
    Returns:
        Tuple of 2 lists:
        
        - List of timestamps arrays, one for each particle, after 2-states dynamics simulation.
        - List of detectors arrays, one for each particle, after 2-states dynamics simulation.
    """
    np.random.seed(seed)
    times_p = []
    det_p = []
    for particle in range(num_particles):
        print('\n- Simulating particle %d: ' % particle, end='', flush=True)

        # Get timestamps and detectors for current particle in each state
        print('timestamps..', end='', flush=True)
        masks_states = [par == particle for par in par_states]
        times_s = [memoryview(t_par[mask_par]) for t_par, mask_par in zip(times_states, masks_states)]
        det_s = [memoryview(det_par[mask_par]) for det_par, mask_par in zip(det_states, masks_states)]
        print('[done] ', end='', flush=True)

        # Simulate residence times
        print('residence..', end='', flush=True)
        taus_s = [memoryview(exp_dist.rvs(n)) for exp_dist in expon_s]
        sim_duration = np.sum(np.sum(taus) for taus in taus_s) *  times_unit
        assert sim_duration > acquisition_duration
        print('[done] ', end='', flush=True)

        # Compute start/stop indexes for the timestamps for each residence time
        print('transition-index..', end='', flush=True)
        slices_list = sim_two_states_single_particle(times_s, taus_s)
        print('[done] ', end='', flush=True)

        # Create new timestamps and detectors to store dynamics simulation results 
        print('merge..', end='', flush=True)
        times_size = sum([s[1] - s[0] for s in slices_list])
        times = np.zeros(times_size, dtype='int64')
        det = np.zeros(times_size, dtype='uint8')
        par = np.ones(times_size, dtype='uint8') * particle
        times_m = memoryview(times)
        det_m = memoryview(det)

        # istart, istop are indexes of times_m/det_m while the
        # start, stop indexes in slices_list refer to `times_s[state]`
        # where state = 0 for odd elements and state = 1 for even elements.
        # See `sim_two_states_single_particle()` for more info on `slice_list`.
        istart = 0
        state, otherstate = 0, 1
        for start, stop in slices_list:
            istop = istart + stop - start
            times_m[istart:istop] = times_s[state][start:stop]
            det_m[istart:istop] = det_s[state][start:stop]
            istart = istop
            state, otherstate = otherstate, state

        print('[done]', flush=True)
        assert (times != 0).all()
        times_p.append(times)
        det_p.append(det)
    return times_p, det_p

Simulate dynamics


In [ ]:
seed = 987123654  # random number generator seed

times_p, det_p = sim_two_states(35, (times_a, times_b), (det_a, det_b), (par_a, par_b), 
                                times_unit=times_unit, expon_s=expon_s, seed=seed)

In [ ]:
assert all(all(np.diff(t) >= 0) for t in times_p)
assert len(times_p) == len(det_p) == 35

In [ ]:
det_p[0][:10]

In [ ]:
det_p[1][:10]

In [ ]:
times_p[0][:10]

In [ ]:
times_p[1][:10]

Add background

Background is the same Poisson process in the two static files and it is saved as a virtual particle (index = 35, the "36th" virtual particle). Let's take the background from file a for simplicity.


In [ ]:
times_a[par_a == 35]

In [ ]:
det_a[par_a == 35]

In [ ]:
times_p.append(times_a[par_a == 35])
det_p.append(det_a[par_a == 35])

Merge arrays from individual particles


In [ ]:
times_dyn = np.hstack(times_p)
det_dyn = np.hstack(det_p)

In [ ]:
argsort = times_dyn.argsort(kind='mergesort')
times_dyn = times_dyn[argsort]
det_dyn = det_dyn[argsort]
det_dyn

In [ ]:
par_dyn = np.hstack([det_p_i.size * [idx] for idx, det_p_i in enumerate(det_p)])
assert par_dyn.shape[0] == sum(d.size for d in det_p)
par_dyn = par_dyn[argsort]

Save data to Photon-HDF5


In [ ]:
def make_photon_hdf5(times, det, par, times_unit, description, identity=None):
    photon_data = dict(
        timestamps = times,
        timestamps_specs = dict(timestamps_unit=times_unit),
        detectors = det,
        particles = par,
        measurement_specs = dict(
            measurement_type = 'smFRET',
            detectors_specs = dict(spectral_ch1 = np.atleast_1d(0),
                                   spectral_ch2 = np.atleast_1d(1))))

    setup = dict(
        num_pixels = 2,
        num_spots = 1,
        num_spectral_ch = 2,
        num_polarization_ch = 1,
        num_split_ch = 1,
        modulated_excitation = False,
        lifetime = False,
        excitation_alternated=(False,),
        excitation_cw=(True,))

    provenance = dict(software='', software_version='', filename='')

    if identity is None:
        identity = dict()

    description = description
    acquisition_duration = np.round((times[-1] - times[0]) * times_unit)
    data = dict(
        acquisition_duration = round(acquisition_duration),
        description = description,
        photon_data = photon_data,
        setup=setup,
        provenance=provenance,
        identity=identity)
    return data

Create description string


In [ ]:
da.filename

In [ ]:
db.filename

In [ ]:
traj_descr = dedent('\n'.join(da.root.description.read().decode().split('\n')[4:7]))
print(traj_descr)

In [ ]:
part_D_descr = indent(dedent('\n'.join(da.root.description.read().decode().split('\n')[9:11])), '    ')
print(part_D_descr)

In [ ]:
state0_descr = indent(dedent('\n'.join(da.root.description.read().decode().split('\n')[11:13])), '    ')
print(state0_descr)

In [ ]:
state1_descr = indent(dedent('\n'.join(db.root.description.read().decode().split('\n')[11:13])), '    ')
print(state1_descr)

In [ ]:
bg_descr = dedent('\n'.join(da.root.description.read().decode().split('\n')[-4:]))
print(bg_descr)

In [ ]:
tau_a_ms = tau_a * 1e3
tau_b_ms = tau_b * 1e3

tau_a_us = tau_a * 1e6
tau_b_us = tau_b * 1e6

In [ ]:
filename_a_name = Path(filename_a).name
filename_b_name = Path(filename_b).name

In [ ]:
description = f"""\
PyBroMo simulation of 2-states dynamics
----------------------------------------

{traj_descr}
{part_D_descr}

State 0:
    Residency time: {tau_a_ms} ms
{state0_descr}
    filename: {filename_a_name}
    
State 1:
    Residency time: {tau_b_ms} ms
{state0_descr}
    filename: {filename_b_name}
    
{bg_descr}
"""
print(description)

Save file


In [ ]:
identity=dict(author='Antonino Ingargiola', 
              author_affiliation='UCLA')

In [ ]:
data = make_photon_hdf5(times_dyn, det_dyn, par_dyn, times_unit, description, identity=identity)
data

In [ ]:
h5_fname = f'smFRET_0eb9b3_P_35_s0_D_2.5e-11_dynamics_E_40-75_Tau_{tau_a_us:.0f}-{tau_b_us:.0f}us_EmTot_226k-200k_BgD900_BgA600_t_max_600s.hdf5'
h5_fname

In [ ]:
phc.hdf5.save_photon_hdf5(data, h5_fname=h5_fname, overwrite=True)